I'm going to show you how I've been collecting Divvy bike data with the ultimate goal of getting probability distributions for every bike dock in Chicago for every minute of both weekdays and weekends. My ultimate goal is to create a visualization of some sort to see how availability of both docks and bikes changes over time per station. In this article I'll get to the point where I'll be able to make some predictions about how many bikes are available at this exact minute.
What are Divvy bikes? Divvy is a bike sharing system in use throughout Chicago. Every minute they post data regarding the status of every bike dock in the city to an open API. You can learn more about Divvy bikes here: http://divvybikes.com
I've been collecting data for the Divvy bikes throughout Chicago since September with my friend Andrew Slotnick. He showed me that it's a pretty simple API to scrape. Just hit this URL and record the results every minute:
http://divvybikes.com/stations/json
I have a Mac Mini in my living that my Fiancée watches TV shows on while I'm running processes to scrape data sources. It's all about synergy!
So on this multi-tasking Mac Mini I have the following Python script running:
In [ ]:
import urllib
import datetime
import os, time, socket, datetime, glob
import subprocess
socket.setdefaulttimeout(20)
def group_previous_days_files():
print "Attempting grouping..."
print "bozo..."
todays_group = datetime.datetime.now().strftime('%Y%m%d')
print "Today's group is " + todays_group
log_filenames = [file_path for file_path in glob.glob("./DownloadedData/*.json") if not todays_group in file_path and not 'day_' in file_path]
print "Logs found: " + str(log_filenames)
for log_filename in log_filenames:
print "Grouping " + log_filename
base_log_filename = os.path.basename(log_filename)
the_date = base_log_filename.split('_')[0]
grouped_file_path = data_folder + 'day_' + the_date + '.json'
with open(grouped_file_path, 'a') as day_file:
with open(log_filename, 'r') as minute_file:
minute_text = minute_file.read()
day_file.write(minute_text)
print "Removing " + log_filename
os.remove(log_filename)
os.system('s3cmd sync ./DownloadedData/day_*.json s3://bozo-divvy/day-log/')
print "Done uploading to S3"
for day_file in glob.glob('./DownloadedData/day_*.json'):
os.remove(day_file)
data_folder = './DownloadedData/'
while True:
current_file_name = datetime.datetime.now().strftime('%Y%m%d_%H%M') + '.json'
current_file_path = data_folder + current_file_name
if not os.path.exists(current_file_path):
print "Downloading file... then sleeping."
try:
urllib.urlretrieve ("http://divvybikes.com/stations/json", current_file_path)
except:
time.sleep(3)
continue
group_previous_days_files()
else:
print "File already exists. Sleeping"
time.sleep(30)
This script does the following:
If you watched Hilary Mason in this video you might recognize this as the "Obtain" and "Scrub" steps of data science:
This gives data in this format:
The next step is...
My goal from the get go with this data has been to answer the question: "How likely am I to find an available bike or dock?"
I'd also love to be able to see this visualized and animated on a map. We'll see about that. First let's answer my burning question. To do that I'll need to design roughly how I want my data to be organized when I get done with it.
What I want is a row of data that lists the bike station, whether or not it's a weekend, the hour, and the minute of the day, as well as a frequency distribution that maps how often X number of bikes and docks were available. Something like this:
That's what I want my final result to look like. Each index in the arrays maps to a hypothesis. So for available bikes, zero times there were zero available bikes, one time there was one available bike, seven times there were two available bikes, etc. To come up with a probability we just add up all of the elements of the array and divide each element by that sum. The same applies for available docks and total docks.
Two things to keep in mind:
So how do we get from the first data format to this ideal one when we've got gigabytes of data being stored on S3? MRJob.
I could download all of the files to my local computer and run a script locally to do my aggregation. What's great about MRJob is I can use the same script I write locally to run on my data on S3 using Amazon's Elastic Map Reduce (EMR). So I'll start by downloading some of the data locally and running my script locally and once it seems to be working I'll decide if it's easier to run it locally or in the cloud.
Let's get started!
If you need a MapReduce intro or refresher my good friend Tom Hayden has a great introduction to MRJob where he analyzes chess data. Check it out here: http://tomhayden3.com/2013/12/27/chess-mr-job/
This example will require us to use some of MRJob's advanced features like multi-step support and protocols.
Above I showed that the data I collect from divvy is in the form of JSON. So rather than import lines of text and parse them as JSON into Python dictionaries, I can just have MRJob do that for me with "protocols". Below I show what's necessary to define the input and output protocols for this job.
In [ ]:
from mrjob.job import MRJob
from mrjob import protocol
class AggregateDivvyFrequencies(MRJob):
INPUT_PROTOCOL = protocol.JSONValueProtocol
OUTPUT_PROTOCOL = protocol.JSONValueProtocol
...
The main ones I use for input and output are "protocol.RawValueProtocol" and "protocol.JSONValueProtocol". The RawValueProtocol will pass data through just on the value parameter for input and output and won't add any additional formatting to what you explicitly do. This is the default input protocol for MRJob. To read more about protocols check this link out: http://pythonhosted.org/mrjob/guides/writing-mrjobs.html#job-protocols
In this job I've decided to make my life easier by reading in the lines of text as JSON objects and I personally prefer saving the data as JSON as well.
First I need to tally for each of my stations how many times they had a given number of bikes, docks, total docks for weekdays and weekends. This essentially means grouping the stations by their pertinent information (lat/long, station id, is_weekday, etc) by setting that info as the key and then setting the value to 1 to act as a tally of how many times I've seen this. Each line of Divvy data includes metadata around the logging (such as the time the data was logged) and ALL the data for each and every station. This means that my mapping step will create many results from the mapper for each line of input.
Here's what this step looks like:
In [ ]:
def organize_station_data_and_tally(self, _, station_update):
execution_time_text = station_update['executionTime']
the_datetime = datetime.strptime(execution_time_text, '%Y-%m-%d %I:%M:%S %p')
the_hour = the_datetime.hour
the_minute = the_datetime.minute
weekday_index = the_datetime.weekday()
weekday_map = [
'Monday',
'Tuesday',
'Wednesday',
'Thursday',
'Friday',
'Saturday',
'Sunday'
]
station_list = station_update['stationBeanList']
for station_info in station_list:
station_data = {
'station_id': station_info['id'],
'station_name': station_info['stationName'],
'latitude': station_info['latitude'],
'longitude': station_info['longitude'],
'weekday': weekday_index < 5,
'hour': the_hour,
'minute': the_minute,
'total_docks': station_info['totalDocks'],
'available_docks': station_info['availableDocks'],
'available_bikes': station_info['availableBikes']
}
yield station_data, 1
Notice that I'm grouping by station_name as well as id... That's more for convenience than anything. I don't believe that will have changed in the time since I've started collecting data. I want it in the key just to make sure it all gets into my final results.
MRJob ensures that when it calls our reduce function that we get all of the tallies that were associated with a given key all at once.
In [ ]:
def group_minutes_by_station(self, station_data, tallies):
tally_total = sum(tallies)
yield station_data, tally_total
When MRJob calls this, all I need to do is sum up all of the tallies for this key and then I know how many times this exact occurence of weekday, hour, minute, etc. happened.
Now I'll have a perfectly valid representation of how often I see these results but it's a really long linear list. Instead I really want it all organized into more of a ferquency distribution one might expect in a histogram.
To do that, I need to move the total_docks, available_docks, and available_bikes out of the key and move it into value. This way in my next reduce step, I'll be able to take all of the different occurences of those values and add their tallies into the appropriate bucket.
This is the mapping step in this transformation:
In [ ]:
def organize_into_buckets(self, station_data, tally_total):
metrics = {
'total_docks': station_data['total_docks'],
'available_docks': station_data['available_docks'],
'available_bikes': station_data['available_bikes'],
'occurences': tally_total
}
del station_data['total_docks']
del station_data['available_docks']
del station_data['available_bikes']
yield station_data, metrics
In [ ]:
def create_frequency_distributions(self, station_data, all_station_metrics):
available_bikes_frequencies = [0]*100
total_docks_frequencies = [0]*100
available_docks_frequencies = [0]*100
for station_metric in all_station_metrics:
available_bikes_frequencies[station_metric['available_bikes']] += station_metric['occurences']
total_docks_frequencies[station_metric['total_docks']] += station_metric['occurences']
available_docks_frequencies[station_metric['available_docks']] += station_metric['occurences']
station_data['available_bikes'] = available_bikes_frequencies
station_data['available_docks'] = available_docks_frequencies
station_data['total_docks'] = total_docks_frequencies
yield None, station_data
Since this is my last step, note how I'm returning 'None' and also combining all of my data together into a single object. Because I set my output protocol to be a JSONValueProtocol MRJob will automatically serialize my Python dictionary into JSON and out that to my results files.
Here's the whole thing at once:
In [ ]:
from mrjob.job import MRJob
from mrjob import protocol
from datetime import datetime
class AggregateDivvyFrequencies(MRJob):
INPUT_PROTOCOL = protocol.JSONValueProtocol
OUTPUT_PROTOCOL = protocol.JSONValueProtocol
def steps(self):
return [self.mr(mapper=self.organize_station_data_and_tally,
reducer=self.group_minutes_by_station),
self.mr(mapper=self.organize_into_buckets,
reducer=self.create_frequency_distributions)]
def organize_station_data_and_tally(self, _, station_update):
execution_time_text = station_update['executionTime']
the_datetime = datetime.strptime(execution_time_text, '%Y-%m-%d %I:%M:%S %p')
the_hour = the_datetime.hour
the_minute = the_datetime.minute
weekday_index = the_datetime.weekday()
weekday_map = [
'Monday',
'Tuesday',
'Wednesday',
'Thursday',
'Friday',
'Saturday',
'Sunday'
]
station_list = station_update['stationBeanList']
for station_info in station_list:
station_data = {
'station_id': station_info['id'],
'station_name': station_info['stationName'],
'latitude': station_info['latitude'],
'longitude': station_info['longitude'],
'weekday': weekday_index < 5,
'hour': the_hour,
'minute': the_minute,
'total_docks': station_info['totalDocks'],
'available_docks': station_info['availableDocks'],
'available_bikes': station_info['availableBikes']
}
yield station_data, 1
def group_minutes_by_station(self, station_data, tallies):
tally_total = sum(tallies)
yield station_data, tally_total
def organize_into_buckets(self, station_data, tally_total):
metrics = {
'total_docks': station_data['total_docks'],
'available_docks': station_data['available_docks'],
'available_bikes': station_data['available_bikes'],
'occurences': tally_total
}
del station_data['total_docks']
del station_data['available_docks']
del station_data['available_bikes']
yield station_data, metrics
def create_frequency_distributions(self, station_data, all_station_metrics):
available_bikes_frequencies = [0]*100
total_docks_frequencies = [0]*100
available_docks_frequencies = [0]*100
for station_metric in all_station_metrics:
available_bikes_frequencies[station_metric['available_bikes']] += station_metric['occurences']
total_docks_frequencies[station_metric['total_docks']] += station_metric['occurences']
available_docks_frequencies[station_metric['available_docks']] += station_metric['occurences']
station_data['available_bikes'] = available_bikes_frequencies
station_data['available_docks'] = available_docks_frequencies
station_data['total_docks'] = total_docks_frequencies
yield None, station_data
if __name__ == '__main__':
AggregateDivvyFrequencies.run()
Then when I'm happy I have it working, I can run the entire thing on EMR like so:
This is a great chance to point out an awesome feature of MRJob. See above how I specify the output directory as '--output-dir'? I love how I don't have to specify that I'm pulling data from S3 for the EMR version. MRJob just infers it from the path I provide to it. I also have a MRJob config file that I keep in one of MRJob's default paths so I don't have to pass it in as a parameter every time I run a job. More about that here: http://pythonhosted.org/mrjob/configs.html
Here's an example of what the results look like:
In [1]:
import json
results_filepath = '/Users/justin/Documents/Code/divvy_probability/results.txt'
my_longitude = -87.664362
my_latitude = 42.007758
closest_station = None
closest_station_distance = float('inf')
with open(results_filepath, 'r') as results_file:
for line in results_file:
station_data = json.loads(line)
station_latitude = station_data['latitude']
station_logitude = station_data['longitude']
distance = math.sqrt(math.pow(station_latitude-my_latitude,2)+math.pow(station_logitude-my_longitude,2))
if distance < closest_station_distance:
closest_station = {'id': station_data['station_id'], 'name': station_data['station_name']}
closest_station_distance = distance
print closest_station
Seems legit.
Right now it's 12:58 PM on a weekend... Let's see how likely it is there are bikes available.
In [65]:
import numpy
matching_station = None
current_frequencies = numpy.array([])
current_probabilities = numpy.array([])
with open(results_filepath, 'r') as results_file:
for line in results_file:
station_data = json.loads(line)
if station_data['station_id'] == closest_station['id'] and station_data['weekday']==False and station_data['hour'] == 13 and station_data['minute']==5:
matching_station = station_data
current_frequencies = numpy.array(station_data['available_bikes'])
break
current_probabilities = current_frequencies / (1.0*sum(current_frequencies))
print "Probability of number of bikes available"
for i in range(0,16):
print str(i) + ': ' + str(100*current_probabilities[i]) + '%'
If we assume the frequencies are directly interpretable as probabilities then there is practically a 100% chance there will be a bike available. There is also an 84.375% chance that there are between 3 to 8 bikes available.
Looking at this, it says it's IMPOSSIBLE that there will be no bikes available. That seems optimistic to me. Let's see what the probability distribution looks like for there NOT being a bike available.
In [100]:
import numpy
samples = map(lambda x: math.floor(100*numpy.random.beta(1, 1+sum(current_frequencies))), numpy.arange(0,1000))
histogram = np.histogram(samples, bins=range(0,100))
likelihood_no_bikes = histogram[0]/(1.0*sum(histogram[0]))
print likelihood_no_bikes
import matplotlib.pyplot as plt
plt.hist(samples, bins=range(0,100))
Out[100]:
From this we can count up the frequencies of our hypotheses and conclude that a credible likelihood interval is a 0-7% chance that there will be no bikes.
Now what? Well the next step will be to enable people to explore this data. The current form of this data doesn't do much to help us with this task. Instead I'd like to make this something that my Fiancée would be interested to see. If I can make a solid visualization out of this, I think that might do the trick.
Sounds like a great topic for my next blog post. Thanks for reading!